AWS IAM Identity Centerの棚卸しスクリプトを書いてみる【Python,Boto3】
以下アップデートから、 AWS IAM Identity Center (旧 AWS Single-Sign On) の IDストアに対してAPIから参照・更新ができるようになっています。
今回はこれらAPIを使って、IAM Identity Center の棚卸しスクリプトを書いてみます。
IDストア情報(ユーザー, グループ)および、許可セットや割り当て情報 を Markdownファイルとして出力するPythonスクリプトです。
更新履歴
- 2023/01/19
- PermissionSetArn情報を出力に記載
- AWSアカウント一覧を出力に記載
- Markdownテーブルの余分なスペースを削除
出力イメージ
以下のような Markdownファイル を生成します。
# AWS IAM Identity Center 棚卸し - 取得開始時刻: {datetime} - 実行アカウント: {account_name} ({account_id}) ## IAM Identity Center 情報 - インスタンスARN: {instance_arn} - インスタンスストアID: {identity_store_id} ## AWSアカウント一覧 {accounts(Markdownテーブル)} ## ユーザー一覧 {users(Markdownテーブル)} ## グループ一覧 {groups(Markdownテーブル)} ## ユーザーのグループ所属情報 {group_memberships(Markdownテーブル)} ## 許可セット一覧 {permission_sets(Markdownテーブル)} ## 割り当て一覧 {assignments(Markdownテーブル)}
全体(〜ユーザー一覧)
※ 目次(TOC)部分は wikiの自動生成機能で表示
グループ一覧
ユーザーのグループ所属一覧
許可セット一覧
割り当て一覧
作ったPythonスクリプト
前提
- AWS API実行に boto3 を使っています
- Markdownテーブル作成に tabulate を使っています ( 参考 )
- [参考] スクリプト作成・テストに用いた実行環境は以下のとおりです
- OS: macOS Monterey version 12.6
- Python: 3.10.8
- boto3: 1.24.93
- tabulate: 0.9.0
Pythonスクリプト( inventory.py
)
こちらのGist に上げています(後述の補足にも同じものを記載)。
Pythonスクリプトの実行について
事前に 管理アカウント上に対して AWS APIを実行できるようにします※。 例えば以下のようなコマンドが実行成功するか確かめてください。
aws sts get-caller-identity
→ 管理アカウントのアカウントIDであることaws sso-admin list-instances
→ IDストアの情報が出てくること
※IAM Identity Centerを委任しているメンバーアカウント上では正常に動作しません。 管理アカウントへの割り当て情報を取得できないためです。
Pythonスクリプト( inventory.py
) をローカルで実行します。 Markdownファイル( inventory_{YYYY-mm-dd_HH-MM-SS}.md
)が生成されます。
以下実行例です。
$ python3 ./inventory.py # INFO:botocore.credentials:Found credentials in environment variables. # INFO:root:[start] timestamp: 2022-10-28_13-40-24 # INFO:root:# getting sso instances... # ...(略)... # INFO:root:## getting Infra_PRD_NW, IAMReadOnlyAccess assignments... # INFO:root:-> number of assignments: 113 # INFO:root:# generating report(inventory_2022-10-28_13-40-24.md)... # INFO:root:[end] $ ls ./inventory_*.md # inventory_2022-10-28_13-40-24.md $ head -n 16 ./inventory_2022-10-28_13-40-24.md # # AWS IAM Identity Center 棚卸し # # - 取得開始時刻: 2022-10-28_13-40-24 # - 実行アカウント: Payer-example (111111111111) # # ## IAM Identity Center 情報 # # - インスタンスARN: arn:aws:sso:::instance/ssoins-example # - インスタンスストアID: d-example # # ## ユーザー一覧 # # | DisplayName | UserName | UserId | # |-------------|------------------|--------------------------| # | EXAMPLE_AAA | aaa@example.com | 1234567890-aaaa-example | # | EXAMPLE_BBB | bbb@example.com | 1234567890-bbbb-example |
考慮点など
- 使っているアクションは以下のとおりです
- 「存在しない(削除した)ユーザー、グループ」に対する割り当て情報があった場合は
#DELETED({principal_id})
を記載します (スクリプト作成時に詰まったポイント) - (私が試した範囲では起きませんでしたが) APIリクエストが最大レートを超える場合は別途エラーハンドリングを実装する必要があります
- 主に sso:ListAccountAssignments を多用します (AWSアカウント ✕ 許可セット 単位で実行するため)
- 参考: IAM Identity Center throttle limits
おわりに
AWS IAM Identity Center の棚卸しスクリプトを書いてみました。
現状はマネジメントコンソール上では「だれがどのアカウントにどの権限でアクセスできるか」情報は俯瞰しづらいです。こういった棚卸しスクリプトで定期的に確認するのも一つの手です。
参考
- Boto3
- DevelopersIO
補足
Pythonスクリプト( inventory.py
)
import boto3 import logging import re from tabulate import tabulate from datetime import datetime from operator import itemgetter from itertools import product logging.basicConfig(level=logging.INFO) org_client = boto3.client('organizations') idstore_client = boto3.client('identitystore') ssoadmin_client = boto3.client('sso-admin') sts_client = boto3.client('sts') # テンプレート TEMPLATE = """# AWS IAM Identity Center 棚卸し - 取得開始時刻: {datetime} - 実行アカウント: {account_name} ({account_id}) ## IAM Identity Center 情報 - インスタンスARN: {instance_arn} - インスタンスストアID: {identity_store_id} ## AWSアカウント一覧 {accounts} ## ユーザー一覧 {users} ## グループ一覧 {groups} ## ユーザーのグループ所属一覧 {group_memberships} ## 許可セット一覧 {permission_sets} ## 割り当て一覧 {assignments} """ def generate_report(datetime_now='', instance_arn='', identity_store_id='', account_name='', account_id='', accounts='', users='', groups='', group_memberships='', permission_sets='', assignments=''): text = TEMPLATE.format( datetime=datetime_now, instance_arn=instance_arn, identity_store_id=identity_store_id, account_name=account_name, account_id=account_id, accounts=accounts, users=users, groups=groups, group_memberships=group_memberships, permission_sets=permission_sets, assignments=assignments) text_fix = re.sub('\| +', '| ', re.sub(' +\|', ' |', text)) #テーブルの余分なスペースを削除 file_path = f'inventory_{datetime_now}.md' with open(file_path, mode='w') as f: f.write(text_fix) def boto3_accounts(): accounts = [] paginator = org_client.get_paginator('list_accounts') for page in paginator.paginate(): accounts += page.get('Accounts') return accounts def boto3_users(identity_store_id): users = [] paginator = idstore_client.get_paginator('list_users') for page in paginator.paginate(IdentityStoreId=identity_store_id): users += page.get('Users') return users def boto3_groups(identity_store_id): groups = [] paginator = idstore_client.get_paginator('list_groups') for page in paginator.paginate(IdentityStoreId=identity_store_id): groups += page.get('Groups') return groups def boto3_group_memberships(identity_store_id, group_id): memberships = [] paginator = idstore_client.get_paginator( 'list_group_memberships') for page in paginator.paginate(IdentityStoreId=identity_store_id, GroupId=group_id): memberships += page.get('GroupMemberships') return memberships def boto3_permission_sets(instance_arn): permission_sets = [] paginator = ssoadmin_client.get_paginator('list_permission_sets') for page in paginator.paginate(InstanceArn=instance_arn): permission_sets += page.get('PermissionSets') return permission_sets def boto3_account_assignment(instance_arn, account_id, permission_set_arn): assignments = [] paginator = ssoadmin_client.get_paginator('list_account_assignments') for page in paginator.paginate(InstanceArn=instance_arn, AccountId=account_id, PermissionSetArn=permission_set_arn): assignments += page.get('AccountAssignments') return assignments def _principal_name(assignment, user_id_to_name, group_id_to_name): principal_type = assignment.get('PrincipalType') principal_id = assignment.get('PrincipalId') if principal_type == 'USER' and user_id_to_name.get(principal_id): return user_id_to_name.get(principal_id) elif principal_type == 'GROUP' and group_id_to_name.get(principal_id): return group_id_to_name.get(principal_id) else: return f'#DELETED({principal_id})' def main(): # 実行開始時刻の取得 datetime_now = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') logging.info(f'[start] timestamp: {datetime_now}') # (事前準備) SSOインスタンス情報の取得 logging.info('# getting sso instances...') instances = ssoadmin_client.list_instances().get('Instances') identity_store_id = instances[0].get('IdentityStoreId') instance_arn = instances[0].get('InstanceArn') logging.info(f'-> instance store id: {identity_store_id}') logging.info(f'-> instance arn: {instance_arn}') # アカウントID, アカウント名の取得 logging.info('# getting accounts...') accounts = boto3_accounts() account_id_to_name = dict([ (a.get('Id'), a.get('Name')) for a in accounts ]) accounts_tabulate = tabulate( sorted([(a.get('Name'), a.get('Id')) for a in accounts], key=itemgetter(0)), headers=['AccountName', 'AccountId'], tablefmt='github' ) logging.info(f'-> number of accounts: {len(account_id_to_name)}') # 処理を実行しているアカウント情報の取得 exec_account_id = sts_client.get_caller_identity().get('Account') exec_account_name = account_id_to_name.get(exec_account_id) logging.info(f'-> exec account: {exec_account_name}({exec_account_id})') # ユーザー一覧の取得 logging.info('# getting sso users...') users = boto3_users(identity_store_id) user_id_to_name = dict([ (u.get('UserId'), u.get('DisplayName')) for u in users ]) users_tabulate = tabulate( sorted([(u.get('DisplayName'), u.get('UserName'), u.get('UserId')) for u in users], key=itemgetter(0,1)), headers=['DisplayName', 'UserName', 'UserId'], tablefmt='github' ) logging.info(f'-> number of users: {len(user_id_to_name)}') # グループ一覧の取得 logging.info('# getting sso groups...') groups = boto3_groups(identity_store_id) group_id_to_name = dict( [(g.get('GroupId'), g.get('DisplayName')) for g in groups]) groups_tabulate = tabulate( sorted( [(g.get('DisplayName'), g.get('Description'), g.get('GroupId')) for g in groups], key=itemgetter(0) ), headers=['DisplayName', 'Description', 'GroupId'], tablefmt='github' ) logging.info(f'-> number of groups: {len(group_id_to_name)}') # ユーザーのグループ所属一覧の取得 logging.info('# getting all group memberships...') all_memberships = [] for group_id, group_name in group_id_to_name.items(): memberships = boto3_group_memberships(identity_store_id, group_id) all_memberships += [ (group_name, user_id_to_name.get(m.get('MemberId').get('UserId'))) for m in memberships ] all_memberships_tabulate = tabulate( sorted(all_memberships, key=itemgetter(0, 1)), headers=['GroupName', 'UserName'], tablefmt='github' ) logging.info(f'-> number of memberships: {len(all_memberships)}') # 許可セット一覧の取得 logging.info('# getting permission sets...') permission_set_arns = boto3_permission_sets(instance_arn) permission_sets = [] permission_set_arn_to_name = {} for arn in permission_set_arns: permission_set = ssoadmin_client.describe_permission_set( InstanceArn=instance_arn, PermissionSetArn=arn).get('PermissionSet') permission_sets.append(permission_set) permission_set_arn_to_name[arn] = permission_set.get('Name') permission_sets_tabulate = tabulate( sorted( [(ps.get('Name'), ps.get('Description'), ps.get('PermissionSetArn')) for ps in permission_sets], key=itemgetter(0) ), headers=['PermissionSetName', 'Description', 'PermissionSetArn'], tablefmt='github' ) logging.info( f'-> number of permission sets: {len(permission_set_arn_to_name)}') # 割り当て一覧の取得 (アカウント名 x プリンシパル名 x プリンシパル種別 x 許可セット名) logging.info('# getting all assignments...') all_assignments = [] for account_id, permission_set_arn in product(account_id_to_name.keys(), permission_set_arns): account_name = account_id_to_name.get(account_id) permission_set_name = permission_set_arn_to_name.get( permission_set_arn) logging.info( f'## getting {account_name}, {permission_set_name} assignments...') assignments = boto3_account_assignment( instance_arn, account_id, permission_set_arn) for a in assignments: principal_type = a.get('PrincipalType') principal_name = _principal_name( a, user_id_to_name, group_id_to_name) all_assignments.append( (account_name, principal_type, principal_name, permission_set_name)) all_assignments_tabulate = tabulate( sorted(all_assignments, key=itemgetter(0, 1, 2)), headers=['AccountName', 'PrincipalType', 'PrincipalName', 'PermissionSetName'], tablefmt='github' ) logging.info(f'-> number of assignments: {len(all_assignments)}') # 出力 logging.info(f'# generating report(inventory_{datetime_now}.md)...') generate_report( datetime_now=datetime_now, instance_arn=instance_arn, identity_store_id=identity_store_id, account_name=exec_account_name, account_id=exec_account_id, accounts=accounts_tabulate, users=users_tabulate, groups=groups_tabulate, group_memberships=all_memberships_tabulate, permission_sets=permission_sets_tabulate, assignments=all_assignments_tabulate ) logging.info('[end]') main()